/**
  ETFAna project, Anyang Normal University && IMP-CAS
  \class ETFQueue
  \brief a fifo 
  \author SUN Yazhou, asia.rabbit@163.com
  \since 2023-06-08
  \date 2023-06-08 last modified
  \attention
  changelog
  <table>
  <tr>  <th>Date         <th>Author      <th>Description                    </tr>
  <tr>  <td>2023-06-08   <td>Asia Sun    <td>file created                   </tr>
  </table>

  \copyright Copyright (c) 2021-2024 Anyang Normal U. && IMP-CAS with LGPLv3 LICENSE
*/

#include <iostream>
#include <errno.h>
#include <cstdio>
#include <ctime>
#include <unistd.h>
#include <netinet/tcp.h> // to tell if an fd is closed
#include <netinet/in.h> // to tell if an fd is closed

#include "config.h"
#ifdef _ROOT_
#include <TSystem.h>
#endif

#include "ETFQueue.h"
#include "ETFMsg.h"
#include "section_head.h"
#include "vme_event_head.h"

using std::cout;
using std::endl;

#define er ETFMsg::Error
// whether using handshake (i.e., do not send until
// the other end of the internet acks the receipt of last ev)
// #define HANDSHAKE

// static const int HEAD_LEN_PXI = sizeof(section_head) / sizeof(int);
// static const int HEAD_LEN_VME = sizeof(vme_event_head) / sizeof(int);
// to facilitate immediate reaction to ctrl-C
static void handler_queue(int sig, siginfo_t *info, void *context);

ETFQueue::ETFQueue(DaqType daqType) : fDaqType(daqType), fRead(0), fWrite(0),
fMutex(PTHREAD_MUTEX_INITIALIZER), fACK(false), fNAdded(0){
  // initialize items //
  if(kVME == fDaqType) fMAX_ITEM_LEN = online_shm_vme::MAX_LEN;
  else if(kPXI == fDaqType) fMAX_ITEM_LEN = online_shm_pxi::MAX_LEN;
  else er("ETFQueue", "ctor: invalid daq type");
  for(auto &i : fItem) i.initialize(fMAX_ITEM_LEN); // allocate memories
  fMAX_ITEM_SIZE = fMAX_ITEM_LEN * sizeof(int);

  if(-1 == sem_init(&fNEmpty, 1, kMAX_ITEM_NUM))
    er("ETFQueue", "ctor: sem_init - %s", strerror(errno));
  if(-1 == sem_init(&fNLoaded, 1, 0))
    er("ETFQueue", "ctor: sem_init - %s", strerror(errno));

  // create thread to monitor fNLoaded upon capturing SIGINT //
  fsa.sa_flags = SA_SIGINFO; // specifying using sa_sigaction explicitly
  fsa.sa_sigaction = handler_queue;
  sigemptyset(&fsa.sa_mask);
  if(-1 == sigaction(SIGINT, &fsa, nullptr))
    er("ETFQueue", "ctor: sigaction err - %s", strerror(errno));

  // initialize the read/write bookkeeping variables //
  fNHasRead = fNLastRead = fNHasWritten = fNLastWritten = 0;
  fNLeftToRead = fNLeftToWrite = fMAX_ITEM_SIZE;
  fReadDone = fWriteDone = true;
  fEvSizeRead = fEvSizeWrite = -1;
} // end ctor

ETFQueue::~ETFQueue(){}

// add an item from src to the queue
void ETFQueue::Add(const void *src){ // called by ETFReadShm::read()
  if(!NEmpty()) ETFMsg::Warn("ETFQueue", "Add(src): the QUEUE is FULL!"
    "Too small the queue (%d) or too large the DAQ rate", kMAX_ITEM_NUM);
  if(-1 == sem_wait(&fNEmpty))
    if(EINTR != errno) er("ETFQueue", "Add: sem_wait - %s", strerror(errno));
  int s = pthread_mutex_lock(&fMutex);
  if(0 != s) er("ETFQueue", "Add: pthread_mutex_lock error - %s", strerror(s));

  // first word stores ev_len in words, valid both for PXI and VME in online mode
  int sz = ((int *)src)[0] * sizeof(int); // total ev size
  // 1st word stores ev_len, not accounted for in an PXI section
  if(kPXI == fDaqType) sz += sizeof(int);
  memcpy(fItem[fWrite].buf, src, sz);
  
  fItem[fWrite].size = sz; // in bytes
  fItem[fWrite].idx = fItem[fWrite].buf[1]; // event index
  fWrite = kMAX_ITEM_NUM == fWrite + 1 ? 0 : fWrite + 1; // complete the cycle

  s = pthread_mutex_unlock(&fMutex);
  if(0 != s) er("ETFQueue", "Add: pthread_mutex_unlock error - %s", strerror(s));
  if(-1 == sem_post(&fNLoaded)) er("ETFQueue", "Add: sem_post - %s", strerror(errno));
} // end member function Add

// pop out an item from the queue to pointer dst for ana, called by ETFDaqData::Get()
// idx: index of the obtained event
void ETFQueue::Pop(void *dst, unsigned &idx){
  if(-1 == sem_wait(&fNLoaded)) er("ETFQueue", "Pop: sem_wait - %s", strerror(errno));
  int s = pthread_mutex_lock(&fMutex);
  if(0 != s) er("ETFQueue", "Pop: pthread_mutex_lock error - %s", strerror(s));

  memcpy(dst, fItem[fRead].buf, fItem[fRead].size);
  idx = fItem[fRead].buf[1];
  fRead = kMAX_ITEM_NUM == fRead + 1 ? 0 : fRead + 1;

  s = pthread_mutex_unlock(&fMutex);
  if(0 != s) er("ETFQueue", "Pop: pthread_mutex_unlock error - %s", strerror(s));
  if(-1 == sem_post(&fNEmpty)) er("ETFQueue", "Pop: sem_post - %s", strerror(errno));
} // end member function Pop

// add an item from file descriptor to the queue
// \retval: true: receive one event from LAN
bool ETFQueue::Add(int fd){ // called by ETFClient::recv()
  //----------------- now we start reading --------------------//
  if(!NEmpty()) ETFMsg::Warn("ETFQueue", "Add(fd): the QUEUE is FULL!"
    "Too small the queue (%d) or too large the DAQ rate", kMAX_ITEM_NUM);
  if(-1 == sem_wait(&fNEmpty))
    if(EINTR != errno) er("ETFQueue", "Add: sem_wait - %s", strerror(errno));
  int s = pthread_mutex_lock(&fMutex);
  if(0 != s) er("ETFQueue", "Add: pthread_mutex_lock error - %s", strerror(s));

  int nr = 0; // nof bytes per read and write
  char *p = reinterpret_cast<char *>(fItem[fWrite].buf);
  if(-1 == fEvSizeRead){ // read ev_size
    if(fReadDone){
      fNHasRead = 0;
      // if(fItem[fWrite].size) memset(p, 0, fItem[fWrite].size);
      fNLeftToRead = sizeof(int); // read 1st word -- the ev_len
      fReadDone = false;
    } // end if
    // read the first word -- ev_len in words
    nr = read(fd, p + fNHasRead, fNLeftToRead);
    if(-1 == nr){ // nonblocking - ignore EAGAIN
      if(EAGAIN != errno){
        ETFMsg::Info("ETFQueue", "Add: read(1st) fd %d error: %s", fd, strerror(errno));
        ETFMsg::SetIRP(); // exit normally
        fReadDone = true;
      } // end if
    } // end if
    else{
      fNHasRead += nr; fNLeftToRead -= nr;
      if(fNLeftToRead <= 0){ // read ev_size complete
        if(fNLeftToRead < 0) er("ETFQueue",
          "Read: (ev_size) read more than requested____: %d", fNLeftToRead);
        fNLeftToRead = fEvSizeRead = ((int *)p)[0] * sizeof(int); // ev_size in byte
        if(kVME == fDaqType) fNLeftToRead -= sizeof(int); // ev_len has been read
        else fEvSizeRead += sizeof(int); // tot size = ev_len + 1(ev_len itself)
      } // end inner if
    } // end else
  } // end if
  else if(fNLeftToRead > 0){ // read the data zone
    // loop until all the required bytes are read //
    nr = read(fd, p + fNHasRead, fNLeftToRead);
    if(-1 == nr){ // nonblocking - ignore EAGAIN
      if(EAGAIN != errno){
        ETFMsg::Info("ETFQueue", "Add: read fd %d error: %s", fd, strerror(errno));
        ETFMsg::SetIRP();
        fReadDone = true;
      } // end if
    } // end if
    else if(nr > 0){
      fNHasRead += nr; fNLeftToRead -= nr;
      // check if trailing consecutive 0 are met - solid evidence of end-of-event //
    } // end else
    if(fNLeftToRead <= 0){ // reading completed, waiting for acknowledgement
      if(fNLeftToRead < 0) ETFMsg::Error("ETFQueue",
        "Read: (ev) read more than requested____: %d", fNLeftToRead);
#ifdef HANDSHAKE
      static const online_cmd ACK = ONLINE_ACK;
      int nw = write(fd, &ACK, sizeof(online_cmd));
      if(sizeof(online_cmd) == nw){ // write succeeded
#endif
        fItem[fWrite].size = fEvSizeRead;
        fItem[fWrite].idx = fItem[fWrite].buf[1];
        fEvSizeRead = -1;
        fReadDone = true;
        fWrite = kMAX_ITEM_NUM == fWrite + 1 ? 0 : fWrite + 1; // complete the cycle
        fNAdded++; // for beam monitoring
#ifdef HANDSHAKE
      } // end if(ack)
      else if(-1 == nw){ // nonblocking - ignore EAGAIN
        if(EAGAIN == errno) er("ETFQueue", "Add(fd): write cmd not completed in 1 write");
        else{
          ETFMsg::Info("ETFQueue", "Add: write(ack) fd %d error: %s", fd, strerror(errno));
          ETFMsg::SetIRP();
        } // end if
      } // end else if
#endif
    } // end if(left<=0)
  } // end else

  s = pthread_mutex_unlock(&fMutex);
  if(0 != s) er("ETFQueue", "Add: pthread_mutex_unlock error - %s", strerror(s));
  if(fReadDone){
    if(-1 == sem_post(&fNLoaded)) er("ETFQueue", "Add: sem_post(load) - %s", strerror(errno));
  } // end if
  else{
    if(-1 == sem_post(&fNEmpty)) er("ETFQueue", "Add: sem_post(empty) - %s", strerror(errno));
  } // recover fNEmpty, because nothing is loaded

  return fReadDone;
} // end member function Add

// pop out an item from the queue to file-descriptor fd, called by ETFServer::send()
bool ETFQueue::Pop(int fd, size_t *psize){
  int nw = 0; // bytes per write
  while(-1 == sem_trywait(&fNLoaded)){
    if(EAGAIN == errno){
      // return true: so that while in main thread can exit
      if(ETFMsg::ms() % 100 == 0) if(isSockClosed(fd)) ETFMsg::SetIRP();
      if(ETFMsg::irp()) return true; // ctrl-C is pressed
      usleep(5); // wait for the queue to reload
      // cout << "hello, I'm here, " << ETFMsg::ms1() << '\r' << std::flush; // DEBUG
    } // end if
    else if(EINTR != errno) er("ETFQueue", "Pop(fd): sem_trywait - %s", strerror(errno));
    // return true: so that while in main thread can exit
    else return true; // also ctrl-C is pressed
  } // end while
  int s = pthread_mutex_lock(&fMutex);
  if(0 != s) er("ETFQueue", "Pop: pthread_mutex_lock error - %s", strerror(s));

  const char *p = reinterpret_cast<char *>(fItem[fRead].buf);
  // read the ev_size to write //
  if(-1 == fEvSizeWrite){
    fNHasWritten = 0;
    fNLeftToWrite = fEvSizeWrite = fItem[fRead].size;
    if(fWriteDone) fWriteDone = false;
  } // end if

  if(isSockClosed(fd)){ // sock closed in the server peer
    ETFMsg::SetIRP();
    fWriteDone = true; // fake complete, so as to exit while in ETFClient::send
    nw = 0;
  } // end if
  else if(fNLeftToWrite) nw = write(fd, p + fNHasWritten, fNLeftToWrite);
  if(-1 == nw && EAGAIN != errno){ // nonblocking - ignore EAGAIN
    ETFMsg::Info("ETFQueue", "Pop: write fd %d error: %s", fd, strerror(errno));
    ETFMsg::SetIRP(); // exit normally
    fWriteDone = true;
  } // end if
  else if(nw > 0){
    fNHasWritten += nw; fNLeftToWrite -= nw;
  } // end else
  if(fNLeftToWrite <= 0){ // writing completed, waiting for acknowledgement
    if(fNLeftToRead < 0) ETFMsg::Error("ETFQueue",
      "Read: (ev) read more than requested____: %d", fNLeftToRead);
    if(isSockClosed(fd)){ // sock closed in the server peer
      ETFMsg::SetIRP();
      fWriteDone = true; // fake complete, so as to exit while in ETFClient::send
    } // end if
#ifdef HANDSHAKE
    if(fACK){ // the receiver acknowledged the event
      fACK = false;
#endif
      // marking the success of the trasmission of the event //
      if(psize) *psize = fEvSizeWrite;
      fEvSizeWrite = -1;
      fRead = kMAX_ITEM_NUM == fRead + 1 ? 0 : fRead + 1; // complete the cycle
      fWriteDone = true;
#ifdef HANDSHAKE
    } // end if(ack)
#endif
  } // end if(left<=0)

  s = pthread_mutex_unlock(&fMutex);
  if(0 != s) er("ETFQueue", "Pop: pthread_mutex_unlock error - %s", strerror(s));
  if(fWriteDone){
    if(-1 == sem_post(&fNEmpty)) er("ETFQueue", "Pop: sem_post(empty) - %s", strerror(errno));
  } // end if
  else{
    if(-1 == sem_post(&fNLoaded)) er("ETFQueue", "Pop: sem_post(load) - %s", strerror(errno));
  } // recover fNEmpty, because nothing is loaded

  return fWriteDone;
} // end member function Pop

// do some clean-ups
void ETFQueue::Close(){
  if(-1 == sem_destroy(&fNEmpty)) er("ETFQueue", "Close: sem_destroy - %s", strerror(errno));
  if(-1 == sem_destroy(&fNLoaded)) er("ETFQueue", "Close: sem_destroy - %s", strerror(errno));
} // end member function Close


void handler_queue(int sig, siginfo_t *info, void *context){
  if(SIGINT == sig) ETFMsg::Info("handler_queue", "CTRL-C PRESSED");
  else ETFMsg::Warn("handler_queue", "peculiar, not CTRL-C");
  ETFMsg::SetIRP(); // relay the SIGINT
} // end handler

// get socket status
int ETFQueue::sockState(int sfd){
  struct tcp_info tcp_info;
  int tcp_len = sizeof(tcp_info);
  getsockopt(sfd, IPPROTO_TCP, TCP_INFO, &tcp_info, (socklen_t *)&tcp_len);
  return int(tcp_info.tcpi_state);
} // end member function sockState

// get socket status - string
const char *ETFQueue::sockStateC(int sfd){
  static const char *state[] = {
    "ESTABLISHED", "SENT", "RECV", "FIN_WAIT1", "FIN_WAIT2", "TIME_WAIT",
    "CLOSE", "CLOSE_WAIT", "LAST_ACK", "LISTEN", "CLOSING"
  };
  return state[sockState(sfd) - 1];
} // end member function sockStateC

// to tell if fd has been closed
bool ETFQueue::isSockClosed(int sfd){
  const int s = sockState(sfd);
  return (TCP_CLOSE == s || TCP_CLOSE_WAIT == s);
} // end member function isSockClosed

void ETFQueue::Reset(){
  int s = pthread_mutex_lock(&fMutex);
  if(0 != s) er("ETFQueue", "Reset: pthread_mutex_lock error - %s", strerror(s));

  if(-1 == sem_init(&fNLoaded, 1, 0))
    er("ETFQueue", "Reset: sem_init - %s", strerror(errno));
  if(-1 == sem_init(&fNEmpty, 1, kMAX_ITEM_NUM))
    er("ETFQueue", "Reset: sem_init - %s", strerror(errno));
  // for(auto &i : fItem) i.initialize(fMAX_ITEM_LEN); // allocate memories

  s = pthread_mutex_unlock(&fMutex);
  if(0 != s) er("ETFQueue", "Reset: pthread_mutex_unlock error - %s", strerror(s));
} // end member function Reset
